 1.大数据高速计算引擎Spark（上）之Spark Core中RDD编程高阶下的序列化
   
   Spark原理
   在实际开发中会自定义一些对RDD的操作，此时需要注意的是：
       初始化工作是在Driver端进行的
       实际运行程序是在Executor端进行的
   这就涉及到了进程通信，是需要序列化的。
   可以简单的认为SparkContext代表Driver。
package cn.lagou.sparkcore

import org.apache.spark.{SparkConf, SparkContext}

class MyClass1(x: Int) {
  val num = x
}

case class MyClass2(num: Int)

class MyClass3(x: Int) extends Serializable {
  val num = x
}

object SerializableDemo {
  def main(args: Array[String]): Unit = {
    val conf = 
      new SparkConf().setAppName(
        this.getClass.getCanonicalName.init).setMaster("local[*]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")

    val rdd1 = sc.makeRDD(1 to 20)
    def add1(x: Int) = x + 100
    val add2 = add1 _
    // 过程和方法，都具备序列化能力

    //  rdd1.map(add1(_)).foreach(println)
    //  rdd1.map(add2(_)).foreach(println)
    // 普通的类不具备序列化能力
    // 下面的语句会报错，Task not serializable。错误的原因就是因为：
    //MyClass1 类不具备序列化能力
    val object1 = new MyClass1(20)
    // rdd1.map(x => object1.num + x).foreach(println)

    // 解决方案一：使用 case class
    val object2 = MyClass2(30)
    // rdd1.map(x => object2.num + x).foreach(println)

    // 解决方案二：实现 Serializable 接口
    val object3 = new MyClass3(30)
    //rdd1.map(x => object3.num + x).foreach(println)

    // 解决方案三：延迟创建
    println("解决方案三：延迟创建")
    lazy val object4 = new MyClass1(20)
    rdd1.map(x => object4.num + x).foreach(println)
  }
}
   备注：
        如果在方法、函数的定义中引用了不可序列化的对象，也会导致任务不能序列化
        延迟创建的解决方案较为简单，适用性广
 
 2.RDD依赖关系
   
   RDD只支持粗粒度转换，即在大量记录上执行的单个操作。将创建RDD的一系列
Lineage（血统）记录下来，以便恢复丢失的分区。
   RDD的Lineage会记录RDD的元数据信息和转换行为，当该RDD的部分分区数据丢失
时，可根据这些信息来重新运算和恢复丢失的数据分区。
   RDD和它依赖的父RDD（s）的关系有两种不同的类型，即窄依赖(narrow dependency)
和宽依赖(wide dependency)。 依赖有2个作用：其一用来解决数据容错；其二用来
划分stage。
   窄依赖：1:1 或 n:1
   宽依赖：n:m；意味着有 shuffle
   要能够准确、迅速的区分哪些算子是宽依赖；
   DAG(Directed Acyclic Graph) 有向无环图。原始的RDD通过一系列的转换就就形成
了DAG，根据RDD之间的依赖关系的不同将DAG划分成不同的Stage：
     对于窄依赖，partition的转换处理在Stage中完成计算
     对于宽依赖，由于有Shuffle的存在，只能在parent RDD处理完成后，才能开始接
下来的计算
     宽依赖是划分Stage的依据
   
   RDD任务切分中间分为：Driver programe、Job、Stage(TaskSet)和Task
     Driver program：初始化一个SparkContext即生成一个Spark应用
     Job：一个Action算子就会生成一个Job
     Stage：根据RDD之间的依赖关系的不同将Job划分成不同的Stage，遇到一个宽依赖
则划分一个Stage
     Task：Stage是一个TaskSet，将Stage划分的结果发送到不同的Executor执行即为一
个Task
     Task是Spark中任务调度的最小单位；每个Stage包含许多Task，这些Task执行的计算
逻辑相同的，计算的数据是不同的
     
	 注意：Driver programe->Job->Stage-> Task每一层都是1对n的关系。
	 
// 窄依赖
val rdd1 = sc.parallelize(1 to 10, 1)
val rdd2 = sc.parallelize(11 to 20, 1)
val rdd3 = rdd1.union(rdd2)
rdd3.dependencies.size
rdd3.dependencies

// 打印rdd1的数据
rdd3.dependencies(0).rdd.collect
// 打印rdd2的数据
rdd3.dependencies(1).rdd.collect

// 宽依赖
val random = new scala.util.Random
val arr = (1 to 100).map(idx => random.nextInt(100))
val rdd1 = sc.makeRDD(arr).map((_, 1))
val rdd2 = rdd1.reduceByKey(_+_)

// 观察依赖
rdd2.dependencies
rdd2.dependencies(0).rdd.collect
rdd2.dependencies(0).rdd.dependencies(0).rdd.collect

   再谈WordCount
val rdd1 = sc.textFile("/wcinput/wc.txt")
val rdd2 = rdd1.flatMap(_.split("\\s+"))
val rdd3 = rdd2.map((_, 1))
val rdd4 = rdd3.reduceByKey(_+_)
val rdd5 = rdd4.sortByKey()
rdd5.count

// 查看RDD的血缘关系
rdd1.toDebugString
rdd5.toDebugString

// 查看依赖
rdd1.dependencies
rdd1.dependencies(0).rdd
rdd5.dependencies
rdd5.dependencies(0).rdd

// 查看最佳优先位置
val hadoopRDD = rdd1.dependencies(0).rdd
hadoopRDD.preferredLocations(hadoopRDD.partitions(0))

# 使用 hdfs 命令检查文件情况
hdfs fsck /wcinput/wc.txt -files -blocks -locations